package com.ld.curator.api;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/**
 * @author:ld
 * @create:2020-04-30 14:23
 * @description:
 */
public class CuratorWatcher {

    //客户端对象
    private CuratorFramework client;
    //连接地址
    private final String IP = "127.0.0.1:2181";

    /**
     * 获取连接
     */
    @Before
    public void before(){
        //重试机制，即每隔1秒，重试一次，一共可以重试3次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
        //方式一：获取客户端对象
//        client = CuratorFrameworkFactory.newClient(IP, 5000, 3000, retryPolicy);
        //方式二：获取客户端对象
        client = CuratorFrameworkFactory
                .builder()
                .connectString(IP)//连接地址
                .connectionTimeoutMs(3000) //连接超时时间
                .sessionTimeoutMs(5000) //会话超时时间
                .retryPolicy(retryPolicy) //重试机制
                .namespace("base") //命名空间，即指定根节点，后续操作都在根节点下。注意：开头不要加 “/”
                .build();


        //开启连接
        client.start();
    }

    /**
     * 关闭连接
     */
    @After
    public void close(){
        client.close();
    }

    /**
     * 监听节点创建，数据变化
     */
    @Test
    public void nodeCache() throws Exception {
        //创建节点缓存对象
        NodeCache nodeCache = new NodeCache(client, "/node1", false);
        //开启
        nodeCache.start();
        //添加监听器
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("节点数据变化，节点最新数据为：" + new String(nodeCache.getCurrentData().getData()));
            }
        });
        Thread.sleep(Integer.MAX_VALUE);
    }

    /**
     * 监听子节点变化，新增子节点，删除子节点，修改子节点
     * @throws Exception
     */
    @Test
    public void pathChildrenCache() throws Exception {
        //创建children缓存对象
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/node1", true);
        //开启缓存，初始化后
        pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        //添加监听器
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                switch (pathChildrenCacheEvent.getType()){
                    case CHILD_ADDED: //添加节点
                        System.out.println("添加节点，" + pathChildrenCacheEvent.getData().getPath());
                        break;
                    case CHILD_UPDATED: //更新节点
                        System.out.println("更新节点，" + pathChildrenCacheEvent.getData().getPath() +
                                ",节点数据：" + new String(pathChildrenCacheEvent.getData().getData()));
                        break;
                    case CHILD_REMOVED: //删除节点
                        System.out.println("删除节点，" + pathChildrenCacheEvent.getData().getPath());
                        break;
                    default:
                        break;
                }
            }
        });
        Thread.sleep(Integer.MAX_VALUE);
    }
}
